{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "e87d9fcc-4774-44fc-8690-9af0ad92e2d6",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running on local URL:  http://127.0.0.1:7860\n",
      "Running on public URL: https://a3a5ff8f7a21749594.gradio.live\n",
      "\n",
      "This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)\n",
      "<IPython.core.display.HTML object>\n"
     ]
    },
    {
     "data": {
      "text/plain": []
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from typing import List, Optional, Tuple, Dict\n",
    "History = List[Tuple[str, str]]\n",
    "Messages = List[Dict[str, str]]\n",
    "\n",
    "import enum\n",
    "from dataclasses import dataclass\n",
    "from typing import List, Dict, Any, Optional, Tuple\n",
    "from collections import defaultdict\n",
    "\n",
    "\n",
    "@dataclass(frozen=True)\n",
    "class Action:\n",
    "    value: str  # LM returned string for now\n",
    "    use_tool: bool  # if use_tool == False -> propose answer\n",
    "    error: Optional[str] = None\n",
    "\n",
    "def lm_output_to_action(lm_output: str) -> Action:\n",
    "    propose_solution = bool(\"<solution>\" in lm_output)\n",
    "    return Action(lm_output, not propose_solution)\n",
    "\n",
    "from typing import Mapping\n",
    "import re\n",
    "import signal\n",
    "from contextlib import contextmanager\n",
    "from IPython.core.interactiveshell import InteractiveShell\n",
    "from IPython.utils import io\n",
    "from typing import Any\n",
    "\n",
    "from abc import ABC, abstractmethod\n",
    "from typing import Any\n",
    "\n",
    "\n",
    "class Tool(ABC):\n",
    "    \"\"\"Abstract class for a tool.\"\"\"\n",
    "\n",
    "    name: str\n",
    "    signature: str\n",
    "    description: str\n",
    "\n",
    "    @abstractmethod\n",
    "    def __call__(self, *args: Any, **kwds: Any) -> str:\n",
    "        \"\"\"Execute the tool with the given args and return the output.\"\"\"\n",
    "        # execute tool with abitrary args\n",
    "        pass\n",
    "\n",
    "    def reset(self) -> None:\n",
    "        \"\"\"Reset the tool to its initial state.\"\"\"\n",
    "        pass\n",
    "\n",
    "\n",
    "class PythonREPL(Tool):\n",
    "    \"\"\"A tool for running python code in a REPL.\"\"\"\n",
    "\n",
    "    name = \"PythonREPL\"\n",
    "    # This PythonREPL is not used by the environment; It is THE ENVIRONMENT.\n",
    "    signature = \"NOT_USED\"\n",
    "    description = \"NOT_USED\"\n",
    "\n",
    "    def __init__(\n",
    "        self,\n",
    "        user_ns: Mapping[str, Any],\n",
    "        timeout: int = 30,\n",
    "    ) -> None:\n",
    "        super().__init__()\n",
    "        self.user_ns = user_ns\n",
    "        self.timeout = timeout\n",
    "        self.reset()\n",
    "\n",
    "    @contextmanager\n",
    "    def time_limit(self, seconds):\n",
    "        def signal_handler(signum, frame):\n",
    "            raise TimeoutError(f\"Timed out after {seconds} seconds.\")\n",
    "\n",
    "        signal.signal(signal.SIGALRM, signal_handler)\n",
    "        signal.alarm(seconds)\n",
    "        try:\n",
    "            yield\n",
    "        finally:\n",
    "            signal.alarm(0)  # Disable the alarm\n",
    "\n",
    "    def reset(self) -> None:\n",
    "        InteractiveShell.clear_instance()\n",
    "        self.shell = InteractiveShell.instance(\n",
    "            # NOTE: shallow copy is needed to avoid\n",
    "            # shell modifying the original user_ns dict\n",
    "            user_ns=dict(self.user_ns),\n",
    "            colors=\"NoColor\",\n",
    "        )\n",
    "\n",
    "    def __call__(self, query: str) -> str:\n",
    "        \"\"\"Use the tool and return observation\"\"\"\n",
    "        with io.capture_output() as captured:\n",
    "            _ = self.shell.run_cell(query, store_history=True)\n",
    "        output = captured.stdout\n",
    "\n",
    "        if output == \"\":\n",
    "            output = \"[Executed Successfully with No Output]\"\n",
    "\n",
    "        # replace potentially sensitive filepath\n",
    "        # e.g., File /mint/mint/tools/python_tool.py:30, in PythonREPL.time_limit.<locals>.signal_handler(signum, frame)\n",
    "        # with File <filepath>:30, in PythonREPL.time_limit.<locals>.signal_handler(signum, frame)\n",
    "        # use re\n",
    "        output = re.sub(\n",
    "                # r\"File (/mint/)mint/tools/python_tool.py:(\\d+)\",\n",
    "                r\"File (.*)mint/tools/python_tool.py:(\\d+)\",\n",
    "                r\"File <hidden_filepath>:\\1\",\n",
    "                output,\n",
    "            )\n",
    "        if len(output) > 2000:\n",
    "            output = output[:2000] + \"...\\n[Output Truncated]\"\n",
    "\n",
    "        return output\n",
    "\n",
    "class ParseError(Exception):\n",
    "    pass\n",
    "\n",
    "def parse_action(action: Action) -> Tuple[str, Dict[str, Any]]:\n",
    "    \"\"\"Define the parsing logic.\"\"\"\n",
    "    lm_output = \"\\n\" + action.value + \"\\n\"\n",
    "    output = {}\n",
    "    try:\n",
    "        if not action.use_tool:\n",
    "            answer = \"\\n\".join(\n",
    "                [\n",
    "                    i.strip()\n",
    "                    for i in re.findall(\n",
    "                        r\"<solution>(.*?)</solution>\", lm_output, re.DOTALL\n",
    "                    )\n",
    "                ]\n",
    "            )\n",
    "            if answer == \"\":\n",
    "                raise ParseError(\"No answer found.\")\n",
    "            output[\"answer\"] = answer\n",
    "        else:\n",
    "            env_input = \"\\n\".join(\n",
    "                [\n",
    "                    i.strip()\n",
    "                    for i in re.findall(\n",
    "                        r\"<execute>(.*?)</execute>\", lm_output, re.DOTALL\n",
    "                    )\n",
    "                ]\n",
    "            )\n",
    "            if env_input == \"\":\n",
    "                raise ParseError(\"No code found.\")\n",
    "            output[\"env_input\"] = env_input\n",
    "    except Exception as e:\n",
    "        raise ParseError(e)\n",
    "    return output\n",
    "\n",
    "python_repl = PythonREPL(\n",
    "            user_ns={},\n",
    "        )\n",
    "\n",
    "import gradio as gr\n",
    "import llama_cpp\n",
    "import llama_cpp.llama_tokenizer\n",
    "import torch\n",
    "\n",
    "if torch.cuda.is_available():\n",
    "  CodeActAgent_llm = llama_cpp.Llama.from_pretrained(\n",
    "      repo_id=\"xingyaoww/CodeActAgent-Mistral-7b-v0.1.q8_0.gguf\",\n",
    "      filename=\"*q8_0.gguf\",\n",
    "      verbose=False,\n",
    "      n_gpu_layers = -1,\n",
    "      n_ctx = 3060\n",
    "  )\n",
    "else:\n",
    "  CodeActAgent_llm = llama_cpp.Llama.from_pretrained(\n",
    "      repo_id=\"xingyaoww/CodeActAgent-Mistral-7b-v0.1.q8_0.gguf\",\n",
    "      filename=\"*q8_0.gguf\",\n",
    "      verbose=False,\n",
    "      #n_gpu_layers = -1,\n",
    "      n_ctx = 3060\n",
    "  )\n",
    "\n",
    "system_prompt = '''\n",
    "You are a helpful assistant assigned with the task of problem-solving. To achieve this, you will be using an interactive coding environment equipped with a variety of tool functions to assist you throughout the process.\n",
    "\n",
    "At each turn, you should first provide your step-by-step thinking for solving the task. Your thought process should be enclosed using \"<thought>\" tag, for example: <thought> I need to print \"Hello World!\" </thought>.\n",
    "\n",
    "After that, you have two options:\n",
    "\n",
    "1) Interact with a Python programming environment and receive the corresponding output. Your code should be enclosed using \"<execute>\" tag, for example: <execute> print(\"Hello World!\") </execute>.\n",
    "2) Directly provide a solution that adheres to the required format for the given task. Your solution should be enclosed using \"<solution>\" tag, for example: The answer is <solution> A </solution>.\n",
    "\n",
    "You have {max_total_steps} chances to interact with the environment or propose a solution. You can only propose a solution {max_propose_solution} times.\n",
    "'''.format(\n",
    "    **{\n",
    "        \"max_total_steps\": 5,\n",
    "        \"max_propose_solution\": 2,\n",
    "    }\n",
    ")\n",
    "\n",
    "\n",
    "def exe_to_md(str_):\n",
    "    req = str_.replace(\"<execute>\" ,\"```python\").replace(\"</execute>\" ,\"```\").replace(\"<solution>\" ,\"```python\").replace(\"</solution>\" ,\"```\")\n",
    "    if \"<thought>\" in req and \"def \" in req:\n",
    "        req = req.replace(\"<thought>\" ,\"```python\").replace(\"</thought>\" ,\"```\")\n",
    "    return req\n",
    "\n",
    "def md_to_exe(str_):\n",
    "    return str_.replace(\"```python\", \"<execute>\").replace(\"```\", \"</execute>\")\n",
    "\n",
    "def clear_session() -> History:\n",
    "    return '', []\n",
    "\n",
    "def modify_system_session(system: str) -> str:\n",
    "    if system is None or len(system) == 0:\n",
    "        system = default_system\n",
    "    return system, system, []\n",
    "\n",
    "def history_to_messages(history: History, system: str) -> Messages:\n",
    "    messages = [{'role': \"system\", 'content': system}]\n",
    "    for h in history:\n",
    "        messages.append({'role': \"user\", 'content': h[0]})\n",
    "        if h[1] != \"😊\":\n",
    "            messages.append({'role': \"assistant\", 'content':\n",
    "                md_to_exe(h[1])\n",
    "            })\n",
    "    return messages\n",
    "\n",
    "def messages_to_history(messages: Messages) -> Tuple[str, History]:\n",
    "    assert messages[0]['role'] == \"system\"\n",
    "    system = messages[0]['content']\n",
    "    history = []\n",
    "    import numpy as np\n",
    "    import pandas as pd\n",
    "    from copy import deepcopy\n",
    "    messages = deepcopy(messages)\n",
    "    if messages[-1][\"role\"] == \"user\":\n",
    "        messages += [{\"role\": \"assistant\", \"content\": \"😊\"}]\n",
    "\n",
    "    messages_ = []\n",
    "    for ele in messages[1:]:\n",
    "        if not messages_:\n",
    "            messages_.append(ele)\n",
    "        else:\n",
    "            if messages_[-1][\"role\"] == ele[\"role\"]:\n",
    "                continue\n",
    "            else:\n",
    "                messages_.append(ele)\n",
    "\n",
    "    history = pd.DataFrame(np.asarray(messages_).reshape([-1, 2]).tolist()).applymap(\n",
    "        lambda x: x[\"content\"]\n",
    "    ).applymap(\n",
    "        exe_to_md\n",
    "    ).values.tolist()\n",
    "    return system, history\n",
    "\n",
    "def model_chat(query: Optional[str], history: Optional[History], system: str\n",
    ") -> Tuple[str, str, History]:\n",
    "    if query is None:\n",
    "        query = ''\n",
    "    if history is None:\n",
    "        history = []\n",
    "    messages = history_to_messages(history, system)\n",
    "    if query:\n",
    "        messages.append({'role': \"user\", 'content': query})\n",
    "\n",
    "    response = CodeActAgent_llm.create_chat_completion(\n",
    "        messages=messages,\n",
    "        stream=True,\n",
    "        top_p = 0.9,\n",
    "        temperature = 0.01\n",
    "    )\n",
    "\n",
    "    from IPython.display import clear_output\n",
    "    lm_output = \"\"\n",
    "    for chunk in response:\n",
    "        delta = chunk[\"choices\"][0][\"delta\"]\n",
    "        if \"content\" not in delta:\n",
    "            continue\n",
    "        lm_output += delta[\"content\"]\n",
    "    \n",
    "    lm_output = lm_output.replace(\"<solution>\", \"<execute>\").replace(\"</solution>\", \"</execute>\")\n",
    "\n",
    "    if \"<execute>\" in lm_output:\n",
    "        action_out = lm_output_to_action(lm_output)\n",
    "        parsed = parse_action(action_out)\n",
    "        env_input = parsed[\"env_input\"]\n",
    "        obs = python_repl(env_input).strip()\n",
    "        obs = '''\n",
    "        Observation:\n",
    "        {}\n",
    "        '''.format(obs).strip()\n",
    "\n",
    "        system, history = messages_to_history(messages + [\n",
    "            {'role': \"assistant\",\n",
    "            'content': exe_to_md(lm_output)},\n",
    "            {\n",
    "             'role': \"user\",\n",
    "             \"content\": obs\n",
    "            }\n",
    "        ])\n",
    "    elif \"<thought>\" in lm_output:\n",
    "        system, history = messages_to_history(messages + [\n",
    "            {'role': \"assistant\",\n",
    "            'content': exe_to_md(lm_output)},\n",
    "        ])\n",
    "    else:\n",
    "        system, history = messages_to_history(messages + [\n",
    "            {'role': \"assistant\",\n",
    "            'content': exe_to_md(lm_output)},\n",
    "        ])\n",
    "    return \"\", history, system\n",
    "\n",
    "\n",
    "with gr.Blocks() as demo:\n",
    "    gr.Markdown(\"\"\"<center><font size=8>CodeActAgent Mistral 7B Bot 🤖</center>\"\"\")\n",
    "\n",
    "    with gr.Row():\n",
    "        with gr.Column(scale=3):\n",
    "            system_input = gr.Textbox(value=system_prompt, lines=1, label='System', visible = False)\n",
    "        with gr.Column(scale=1):\n",
    "            modify_system = gr.Button(\"🛠️ Set system prompt and clear history\", scale=2, visible = False)\n",
    "        system_state = gr.Textbox(value=system_prompt, visible=False)\n",
    "    chatbot = gr.Chatbot(label='CodeActAgent-Mistral-7b-v0.1')\n",
    "    textbox = gr.Textbox(lines=2, label='Input')\n",
    "\n",
    "    with gr.Row():\n",
    "        clear_history = gr.Button(\"🧹 Clear History\")\n",
    "        sumbit = gr.Button(\"🚀 Send\")\n",
    "\n",
    "    sumbit.click(model_chat,\n",
    "                 inputs=[textbox, chatbot, system_state],\n",
    "                 outputs=[textbox, chatbot, system_input],\n",
    "                 concurrency_limit = 100)\n",
    "    clear_history.click(fn=clear_session,\n",
    "                        inputs=[],\n",
    "                        outputs=[textbox, chatbot])\n",
    "    modify_system.click(fn=modify_system_session,\n",
    "                        inputs=[system_input],\n",
    "                        outputs=[system_state, system_input, chatbot])\n",
    "\n",
    "    gr.Examples(\n",
    "        [\n",
    "            \"teach me how to use numpy.\",\n",
    "            \"Give me a python function give the divide of number it self 10 times.\",\n",
    "            '''\n",
    "            Plot box plot with pandas and save it to local.\n",
    "            '''.strip(),\n",
    "\n",
    "            '''\n",
    "            Write a python code about, download image to local from url, the format as :\n",
    "            url = f'https://image.pollinations.ai/prompt/{prompt}'\n",
    "            where prompt as the input of download function.\n",
    "            '''.strip(),\n",
    "            \"Use this function download a image of bee.\",\n",
    "\n",
    "            '''\n",
    "            Draw a picture teach me what linear regression is.\n",
    "            '''.strip(),\n",
    "            \"Use more points and draw the image with the line fitted.\",\n",
    "\n",
    "            '''\n",
    "            Write a piece of Python code to simulate the financial transaction process and draw a financial images chart by lineplot of Poisson process.\n",
    "            '''.strip(),\n",
    "            #\"Add monotonic increasing trend on it.\",\n",
    "            \"Add a Trigonometric function loop on it.\",\n",
    "        ],\n",
    "        inputs = textbox,\n",
    "        label = \"Task Prompt: \\n(Used to give the function or task defination on the head)\",\n",
    "    )\n",
    "\n",
    "    gr.Examples(\n",
    "        [\n",
    "            '''\n",
    "            Give me the function defination. 💡\n",
    "            '''.strip(),\n",
    "\n",
    "            '''\n",
    "            Correct it. ☹️❌\n",
    "            '''.strip(),\n",
    "\n",
    "            '''\n",
    "            Save the output as image 🖼️ to local. ⏬\n",
    "            '''.strip(),\n",
    "\n",
    "            '''\n",
    "            Good Job 😊\n",
    "            '''.strip(),\n",
    "        ],\n",
    "        inputs = textbox,\n",
    "        label = \"Action Prompt: \\n(Used to specify downstream actions taken by LLM, such as modifying errors, saving running results locally, saying you did a good job, etc.)\",\n",
    "    )\n",
    "\n",
    "demo.queue(api_open=False)\n",
    "demo.launch(max_threads=30, share = True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "634a7971-b3e0-4130-ac00-43c93c3847eb",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
